package org.databandtech.job.jobs;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

import org.databandtech.job.entity.SavableTaskJob;
import org.databandtech.job.entity.SaveDestination;
import org.databandtech.job.entity.ScheduledTaskJob;
import org.databandtech.job.utils.SaveDataUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * hive执行查询任务，需要返回数据集，并对结果存储
 *
 */
public class HiveSqlQueryJob implements ScheduledTaskJob,SavableTaskJob<ResultSet>{
	
	public HiveSqlQueryJob(String key, String cron, String staticFieldValueMap, String conStr, String sql,
			SaveDestination dest) {
		super();
		this.key = key;
		this.cron = cron;
		this.staticFieldValueMap = staticFieldValueMap;
		this.conStr = conStr;
		this.sql = sql;
		this.dest = dest;
	}

	private String key;	
	private String cron;
	private String staticFieldValueMap; //用于保存到字段的静态map映射
	private String conStr ;// "jdbc:hive2://192.168.13.200:10000/default";
	private String sql;//"select * from"
	private SaveDestination dest; //保存结果的元数据描述
	private static final Logger LOGGER = LoggerFactory.getLogger(HiveSqlQueryJob.class);
	
	public String getConStr() {
		return conStr;
	}

	public void setConStr(String conStr) {
		this.conStr = conStr;
	}

	public String getSql() {
		return sql;
	}

	public void setSql(String sql) {
		this.sql = sql;
	}
	
	public String getKey() {
		return key;
	}

	public void setKey(String key) {
		this.key = key;
	}
	
	public String getCron() {
		return cron;
	}

	public void setCron(String cron) {
		this.cron = cron;
	}
	
	public SaveDestination getDest() {
		return dest;
	}

	public void setDest(SaveDestination dest) {
		this.dest = dest;
	}
	
	public String getStaticFieldValueMap() {
		return staticFieldValueMap;
	}

	public void setStaticFieldValueMap(String staticFieldValueMap) {
		this.staticFieldValueMap = staticFieldValueMap;
	}

	@Override
	public void run() {
		Connection con = null;
		try {
			
			Class.forName("org.apache.hive.jdbc.HiveDriver");
			con = DriverManager.getConnection(conStr, "", "");
			Statement stmt = con.createStatement();
			ResultSet rs = stmt.executeQuery(sql);
			SaveData(rs,dest);//对结果数据进行存储
			LOGGER.info("CommandExecuteJob => {}  run  当前线程名称 {} ", key, Thread.currentThread().getName());
		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			try {
				if (con != null)
					con.close();
			} catch (Exception ex) {
			}
		}
	}

	@Override
	public String SaveData(ResultSet t,SaveDestination dest) {
		return SaveDataUtils.save(t,dest);
	}



}
